#include "config.h"

#include <time.h>
#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "disk.h"
#include "squeue.h"
#include "fileinfo.h"
#include "bh.h"
#include "timer.h"
#include "net_global.h"
#include "core.h"
#include "bmap.h"
#include "sequence.h"
#include "dbg.h"
#include "../../storage/controller/ramdisk.h"

typedef replica_srv_entry_t entry_t;

static void __replica_srv_cancel_clock(entry_t *ent, int retval)
{
        int ret;
        wlist_t *wlist;
        struct list_head *pos, *n;

        ret = sy_spin_lock(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
        list_for_each_safe(pos, n, &ent->wq.wlist) {
                wlist = (void *)pos;

                if (wlist->op == __OP_READ_WAIT__ || wlist->op == __OP_WRITE_WAIT__) {
                        list_del(&wlist->hook);
                        schedule_resume(&wlist->task, retval, NULL);
                }
        }

        sy_spin_unlock(&ent->wq.lock);
}

static int __replica_srv_push_commit(const char *pool, const chkid_t *chkid, const diskloc_t *loc,
                                     const buffer_t *buf)
{
        int ret;
        buffer_t *tmp;
        buffer_t align;

        (void) pool;

        //UNIMPLEMENTED(__WARN__);
        if (likely(gloconf.rdma || buf == NULL || mbuffer_isalign(buf))) {
                tmp = (buffer_t *)buf;
        } else {
                mbuffer_clone(&align, buf);
                tmp = &align;
        }

                //YASSERT(chkid->type != __RAW_CHUNK__);
        ret = diskmd_aio_write(chkid, loc, tmp, 0, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if RAMDISK_ENABLE
        ret = ramdisk_write(chkid, buf, buf->len, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DWARN("replica write "CHKID_FORMAT" offset:%d size:%d loc disk %d idx %d\n",
                        CHKID_ARG(chkid), 0, buf->len, loc->diskid, loc->idx);
#endif

        if (unlikely(tmp == &align))
                mbuffer_free(tmp);

        return 0;
err_ret:
        if (unlikely(tmp == &align))
                mbuffer_free(tmp);
        return ret;
}

static int __replica_srv_push__(entry_t *ent, uint64_t fingerprint, const vclock_t *vclock, const buffer_t *buf, int flags)
{
        int ret, disk_online = 1, cache_online = 1, retry = 0;
        diskloc_t newloc;

        ret = diskmd_online(ent->loc.diskid, &disk_online);
        if (unlikely(ret)) {
                if (unlikely(ret == ENODEV))
                        disk_online = 0;
                else
                        GOTO(err_ret, ret);
        }

        if (unlikely(!disk_online || !cache_online)) {
                ret = clock_set(&ent->chkid, vclock, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        retry:
                ret = diskmd_create_direct(ent->pool, &newloc, 1, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __replica_srv_push_commit(ent->pool, &ent->chkid, &newloc, buf);
                if (unlikely(ret)) {
                        if (ret == ENODEV) {
                                YASSERT(retry < 3);
                                retry++;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }

                ret = disk_maping->setloc(&ent->chkid, &newloc);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("delete "CHKID_FORMAT" loc "LOC_FORMAT"\n", CHKID_ARG(&ent->chkid), LOC_ARG(&ent->loc))

                // TODO core: ret == ENODEV
                ret = diskmd_delete(&ent->loc);
                if (unlikely(ret)) {
                        if (ret != ENODEV) {
                                UNIMPLEMENTED(__DUMP__);
                        }
                }

                ent->loc = newloc;

                ret = clock_set(&ent->chkid, vclock, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                // TODO clock_set dismatch

                ret = __replica_srv_push_commit(ent->pool, &ent->chkid, &ent->loc, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = clock_set(&ent->chkid, vclock, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_push(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;
        uint64_t fingerprint;

        const char *pool = va_arg(ap, char *);
        const nid_t *owner = va_arg(ap, nid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const fileid_t *parent = va_arg(ap, fileid_t *);
        int tier = va_arg(ap, int);
        const vclock_t *vclock = va_arg(ap, vclock_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);
        const buffer_t *buf = va_arg(ap, buffer_t *);
        int flags = va_arg(ap, int);

        va_end(ap);

#if ENABLE_CHUNK_DEBUG
        DINFO("push "CHKID_FORMAT" ownner %s clock %ju meta_version %ju\n",
              CHKID_ARG(chkid), network_rname(owner), vclock->clock, meta_version);
#else
        DBUG("push "CHKID_FORMAT" ownner %s clock %ju meta_version %ju\n",
              CHKID_ARG(chkid), network_rname(owner), vclock->clock, meta_version);
#endif

        ret = sequence_get(&fingerprint);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);
        
        ret = replica_srv_create_with_fingerprint(pool, owner, chkid, 1, parent,
                                                  tier, 0, NULL, meta_version, fingerprint, 1);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        // TODO
                        DINFO("push "CHKID_FORMAT" from %s clock %ju meta_version %ju, exist\n",
                              CHKID_ARG(chkid), network_rname(owner), vclock->clock, meta_version);
                } else
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, 1000 * 100, "replica_srv_push1");
        
        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ANALYSIS_BEGIN(1);
        
        ret = __replica_srv_wqcheck(ent);
        if (unlikely(ret)) {
                DWARN("push "CHKID_FORMAT" from %s clock %ju meta_version %ju, exist\n",
                      CHKID_ARG(chkid), network_rname(owner), vclock->clock, meta_version);
                EXIT(EAGAIN);
        }
        
        ret = __replica_srv_push__(ent, fingerprint, vclock, buf, flags);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_QUEUE(1, 1000 * 100, "replica_srv_push2");
        ANALYSIS_BEGIN(2);
        
        ent->owner = *owner;
        ent->vclock = *vclock;

        __replica_srv_cancel_clock(ent, EAGAIN);

        /* must update meta_version, otherwise will be droped by replica_unlink */
        ret = disk_maping->setmetaversion(chkid, meta_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_QUEUE(2, 1000 * 100, "replica_srv_push3");
        
        mcache_unlock(cent);
        replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

int replica_srv_push(const char *pool, const nid_t *owner, const chkid_t *chkid,
                     const fileid_t *parent, int tier, const vclock_t *vclock,
                     uint64_t meta_version, const buffer_t *buf, int flags)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "replica_sync", __replica_srv_push,
                           pool, owner, chkid, parent, tier, vclock, meta_version, buf, flags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
